/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.loadtests;



import com.bff.gaia.unified.sdk.io.synthetic.SyntheticStep;

import com.bff.gaia.unified.sdk.options.Default;

import com.bff.gaia.unified.sdk.options.Description;

import com.bff.gaia.unified.sdk.testutils.metrics.ByteMonitor;

import com.bff.gaia.unified.sdk.testutils.metrics.TimeMonitor;

import com.bff.gaia.unified.sdk.transforms.*;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions;

import com.bff.gaia.unified.sdk.transforms.Combine;

import com.bff.gaia.unified.sdk.transforms.Count;

import com.bff.gaia.unified.sdk.transforms.MapElements;

import com.bff.gaia.unified.sdk.transforms.Mean;

import com.bff.gaia.unified.sdk.transforms.PTransform;

import com.bff.gaia.unified.sdk.transforms.ParDo;

import com.bff.gaia.unified.sdk.transforms.SimpleFunction;

import com.bff.gaia.unified.sdk.transforms.Sum;

import com.bff.gaia.unified.sdk.transforms.Top;



import java.io.IOException;

import java.math.BigInteger;

import java.util.Optional;



import static java.lang.String.format;



/**

 * Load test for {@link ParDo} operation.

 *

 * <p>The purpose of this test is to measure {@link Combine}'s behaviour in stressful conditions. It

 * uses synthetic sources and {@link SyntheticStep} which both can be parametrized to generate keys

 * and values of various size, impose delay (sleep or cpu burnout) in various moments during the

 * pipeline execution and provide some other performance challenges.

 *

 * <p>You can choose between multiple combine modes to test per key combine operations ({@link

 * CombinerType}).

 *

 * <p>To run it manually, use the following command:

 *

 * <pre>

 *    ./gradlew :unified-sdks-java-load-tests:run -PloadTest.args='

 *      --fanout=1

 *      --perKeyCombinerType=TOP_LARGEST

 *      --topCount=10

 *      --sourceOptions={"numRecords":1000,...}

 *      --stepOptions={"outputRecordsPerInputRecord":2...}'

 *      -PloadTest.mainClass="com.bff.gaia.unified.sdk.loadtests.CombineLoadTest"

 * </pre>

 */

public class CombineLoadTest extends LoadTest<CombineLoadTest.Options> {



  private static final String METRICS_NAMESPACE = "combine";



  /** Enumerates per-key combiners available in the test. */

  public enum CombinerType {

    TOP_LARGEST,

    MEAN,

    SUM,

    COUNT

  }



  /** Pipeline options specific for this test. */

  public interface Options extends LoadTestOptions {



    @Description("Number consequent of ParDo operations (SyntheticSteps) to be performed.")

    @Default.Integer(1)

    Integer getNumberOfCounterOperations();



    void setNumberOfCounterOperations(Integer count);



    @Description("The number of Combine operations to perform in parallel.")

    @Default.Integer(1)

    Integer getFanout();



    void setFanout(Integer fanout);



    @Description("Per key combiner type.")

    @Default.Enum("MEAN")

    CombinerType getPerKeyCombiner();



    void setPerKeyCombiner(CombinerType combinerType);



    @Description("Number of top results to combine (if applicable).")

    Integer getTopCount();



    void setTopCount(Integer topCount);



    @Description("Number of reiterations over the values to perform.")

    @Default.Integer(1)

    Integer getIterations();



    void setIterations(Integer iterations);

  }



  private CombineLoadTest(String[] args) throws IOException {

    super(args, Options.class, METRICS_NAMESPACE);

  }



  @Override

  protected void loadTest() throws IOException {

    Optional<SyntheticStep> syntheticStep = createStep(options.getStepOptions());



    PCollection<KV<byte[], byte[]>> input =

        pipeline

            .apply("Read input", readFromSource(sourceOptions))

            .apply(

                "Collect start time metric",

                ParDo.of(new TimeMonitor<>(METRICS_NAMESPACE, "runtime")))

            .apply(

                "Collect metrics",

                ParDo.of(new ByteMonitor(METRICS_NAMESPACE, "totalBytes.count")));



    input = applyWindowing(input);



    for (int i = 0; i < options.getFanout(); i++) {

      applyStepIfPresent(input, format("Step: %d", i), syntheticStep)

          .apply(format("Convert to Long: %d", i), MapElements.via(new ByteValueToLong()))

          .apply(format("Combine: %d", i), getPerKeyCombiner(options.getPerKeyCombiner()))

          .apply(

              "Collect end time metric", ParDo.of(new TimeMonitor<>(METRICS_NAMESPACE, "runtime")));

    }

  }



  public PTransform<PCollection<KV<byte[], Long>>, ? extends PCollection> getPerKeyCombiner(

      CombinerType combinerType) {

    switch (combinerType) {

      case MEAN:

        return Mean.perKey();

      case TOP_LARGEST:

        Preconditions.checkArgument(

            options.getTopCount() != null,

            "You should set \"--topCount\" option to use TOP combiners.");

        return Top.largestPerKey(options.getTopCount());

      case SUM:

        return Sum.longsPerKey();

      case COUNT:

        return Count.perKey();

      default:

        throw new IllegalArgumentException("No such combiner!");

    }

  }



  private static class ByteValueToLong

      extends SimpleFunction<KV<byte[], byte[]>, KV<byte[], Long>> {



    @Override

    public KV<byte[], Long> apply(KV<byte[], byte[]> input) {

      return KV.of(input.getKey(), new BigInteger(input.getValue()).longValue());

    }

  }



  public static void main(String[] args) throws IOException {

    new CombineLoadTest(args).run();

  }

}